[Java] CountDownLatch 与 CyclicBarrier


CountDownLatch

CountDownLatch是什么

源码注释描述如下:

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

它是一个同步工具类,允许一个或多个线程一直等待,直到其他线程运行完成后再执行。

看下我遇到的代码:

final class DecodeThread extends Thread {

    private final MainActivity mActivity;
    private final CountDownLatch mHandlerInitLatch;
    private Handler mHandler;

    DecodeThread(MainActivity activity) {
        this.mActivity = activity;
        mHandlerInitLatch = new CountDownLatch(1);
    }

    Handler getHandler() {
        try {
            mHandlerInitLatch.await();
        } catch (InterruptedException ie) {
            // continue?
        }
        return mHandler;
    }

    @Override
    public void run() {
        Looper.prepare();
        mHandler = new DecodeHandler(mActivity);
        mHandlerInitLatch.countDown();
        Looper.loop();
    }
}

CountDownLatch 工作原理

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量(上面代码构造线程数为1)。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

CountDownLatch的构造函数中的count就是闭锁需要等待的线程数量。这个值只能被设置一次,而且不能重新设置。

主线程必须在启动其他线程后立即调用CountDownLatch.await方法,这样主线程就会在这个方法上阻塞,知道其他线程完成各自任务。

其他线程完成任务后必须各自通知CountDownLatch对象,使其调用countDown方法。当count值为0时,主线程就能通过await方法恢复自己的任务。

简述其执行流程:

  1. 运行主线程
  2. 创建N个线程的CountDownLatch
  3. 创建启动N个线程
  4. 主线程运行CountDownLatch.await(),等待latch
  5. N个线程运行完毕,latch计数减到0
  6. 主线程恢复运行

再看上面的代码,主线程在getHandler处等待latch,在run处创建handler后执行latch.countDown,就是为了在get的时候能拿到非空的handler。

使用场景

  1. 实现最大的并行性:有时我们想同时启动多个线程,实现最大程度的并行性。例如,我们想测试一个单例类。如果我们创建一个初始计数为1的CountDownLatch,并让所有线程都在这个锁上等待,那么我们可以很轻松地完成测试。我们只需调用 一次countDown()方法就可以让所有的等待线程同时恢复执行。
  2. 开始执行前等待n个线程完成各自任务:例如应用程序启动类要确保在处理用户请求前,所有N个外部系统已经启动和运行了。
  3. 死锁检测:一个非常方便的使用场景是,你可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。

例子

网上看的一个例子

// 基础checker类
import java.util.concurrent.CountDownLatch;

public abstract class BaseHealthChecker implements Runnable {

    private CountDownLatch _latch;
    private String _serviceName;
    private boolean _serviceUp;

    // Get latch object in constructor so that after completing the task, thread
    // can countDown() the latch
    public BaseHealthChecker(String serviceName, CountDownLatch latch) {
        // TODO Auto-generated constructor stub
        super();
        this._latch = latch;
        this._serviceName = serviceName;
        this._serviceUp = false;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            verifyService();
            _serviceUp = true;
        } catch (Throwable t) {
            // TODO: handle exception
            t.printStackTrace(System.err);
            _serviceUp = false;
        } finally {
            if (_latch != null) {
                _latch.countDown();
            }
        }
    }

    public String getServiceName() {
        return _serviceName;
    }

    public boolean isServiceUp() {
        return _serviceUp;
    }

    // This methos needs to be implemented by all specific service checker
    public abstract void verifyService();
}

然后创建三个checker,分别是CacheChecker,DatabaseChecker和NetworkChecker

import java.util.concurrent.CountDownLatch;


public class CacheHealthChecker extends BaseHealthChecker {

    public CacheHealthChecker (CountDownLatch latch)  {
        super("Cache Service", latch);
    }

    @Override
    public void verifyService()
    {
        System.out.println("Checking " + this.getServiceName());
        try
        {
            Thread.sleep(6000);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        System.out.println(this.getServiceName() + " is UP");
    }
}

创建一个启动类,处理checker的检测

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;


public class ApplicationStartupUtil {
    //List of service checkers
    private static List _services;

    //This latch will be used to wait on
    private static CountDownLatch _latch;

    private ApplicationStartupUtil()
    {
    }

    private final static ApplicationStartupUtil INSTANCE = new ApplicationStartupUtil();

    public static ApplicationStartupUtil getInstance()
    {
        return INSTANCE;
    }

    public static boolean checkExternalServices() throws Exception
    {
        //Initialize the latch with number of service checkers
        _latch = new CountDownLatch(3);

        //All add checker in lists
        _services = new ArrayList();
        _services.add(new NetworkHealthChecker(_latch));
        _services.add(new CacheHealthChecker(_latch));
        _services.add(new DatabaseHealthChecker(_latch));

        //Start service checkers using executor framework
        Executor executor = Executors.newFixedThreadPool(_services.size());

        for(final BaseHealthChecker v : _services)
        {
            executor.execute(v);
        }

        //Now wait till all services are checked
        _latch.await();

        //Services are file and now proceed startup
        for(final BaseHealthChecker v : _services)
        {
            if( ! v.isServiceUp())
            {
                return false;
            }
        }
        return true;
    }
}

在主程序中运行检测

public class main {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        boolean result = false;
        try {
            result = ApplicationStartupUtil.checkExternalServices();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("External services validation completed !! Result was :: "+ result);

    }
}

运行结果:

Checking Network Service
Checking Cache Service
Checking Database Service
Database Service is UP
Cache Service is UP
Network Service is UP
External services validation completed !! Result was :: true

CyclicBarrier

CyclicBarrier 定义

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.

也是一个同步工具类,它是让一组线程相互等待进入barrier状态,然后这组线程再执行。

初看定义可能有些懵,看个示例代码就清楚了。

示例

public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        for(int i=0;i

示例中创建了监控4个线程的CyclicBarrier(4),然后启动4个线程,每个线程都持有cyclicBarrier对象。

在每个线程执行一半时,运行cyclicBarrier.await(),此时CyclicBarrier就会进行+1计数,当前线程被阻塞。当计数达到4时,解除阻塞,所有线程都继续执行。

所以运行结果是:

线程Thread-0正在写入数据…
线程Thread-3正在写入数据…
线程Thread-2正在写入数据…
线程Thread-1正在写入数据…
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务…
所有线程写入完毕,继续处理其他任务…
所有线程写入完毕,继续处理其他任务…
所有线程写入完毕,继续处理其他任务…

要点

  • 构造方法有两种

CyclicBarrier(int parties)
默认构造方法,参数表示拦截的线程数量。

CyclicBarrier(int parties, Runnable barrierAction)
由于线程之前的调度是由CPU决定的,所以默认的构造方法无法设置线程执行优先级,CyclicBarrier提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达同步点时,优先执行线程barrierAction,这样可以更加方便的处理一些负责的业务场景。

  • await实现

CyclicBarrier同样提供带超时时间的await和不带超时时间的await。如果指定了时间,在时间内某个线程还未await,就抛出异常,所有线程继续执行后续任务。

  • reset功能

reset可以使其不断地复用

两者区别

CountDownLatch CyclicBarrier
减计数方式 加计数方式
计算为0时释放所有等待的线程 计数达到指定值时释放所有等待线程
计数为0时,无法重置 计数达到指定值时,计数置为0重新开始
调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响 调用await()方法计数加1,若加1后的值不等于构造方法的值,则线程阻塞
不可重复利用 可重复利用

文章作者: Wossoneri
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC 4.0 许可协议。转载请注明来源 Wossoneri !
评论
  目录